{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<style>div.container { width: 100% }</style>\n",
    "<img style=\"float:left;  vertical-align:text-bottom;\" height=\"65\" width=\"172\" src=\"../assets/PyViz_logo_wm_line.png\" />\n",
    "<div style=\"float:right; vertical-align:text-bottom;\"><h2>Tutorial 09. Operations and Pipelines</h2></div>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "When interactively exploring a dataset you often end up interleaving visualization and analysis code. In HoloViews your visualization and your data are one and the same, so analysis and data transformations can be applied directly to the visualizable data. For that purpose HoloViews provides operations, which can be used to implement any analysis or data transformation you might want to do. Operations take a HoloViews Element and return another Element of either the same type or a new type, depending on the operation. We'll illustrate operations and pipelines using a variety of libraries:\n",
    "\n",
    "<div style=\"margin: 10px\">\n",
    "<a href=\"http://holoviews.org\"><img style=\"margin:8px; display:inline; object-fit:scale-down; max-height:150px\" src=\"../assets/holoviews.png\"/></a>\n",
    "<a href=\"http://bokeh.pydata.org\"><img style=\"margin:8px; display:inline; object-fit:scale-down; max-height:150px\" src=\"../assets/bokeh.png\"/></a>\n",
    "<a href=\"http://datashader.org\"><img style=\"margin:8px; display:inline; object-fit:scale-down; max-height:150px\" src=\"../assets/datashader.png\"/></a>\n",
    "<a href=\"http://ioam.github.io/param\"><img style=\"margin:8px; display:inline; object-fit:scale-down; max-height:150px\" src=\"../assets/param.png\"/></a><br><br>\n",
    "<a href=\"http://pandas.pydata.org\"><img style=\"margin:8px; display:inline; object-fit:scale-down; max-height:140px\" src=\"../assets/pandas.png\"/></a>\n",
    "<a href=\"http://matplotlib.org\"><img style=\"margin:8px; display:inline; object-fit:scale-down; max-height:150px\" src=\"../assets/matplotlib_wm.png\"/></a>\n",
    "<a href=\"http://numpy.org\"><img style=\"margin:8px; display:inline; object-fit:scale-down; max-height:150px\" src=\"../assets/numpy.png\"/></a>\n",
    "</div>\n",
    "\n",
    "Since Operations know about HoloViews you can apply them to large collections of data collected in HoloMap and DynamicMap containers. Since operations work on both of these containers that means they can also be applied lazily. This feature allows us to chain multiple operations in a data analysis, processing, and visualization pipeline, e.g. to drive the operation of a dashboard.\n",
    "\n",
    "Pipelines built using DynamicMap and HoloViews operations are also useful for caching intermediate results and just-in-time computations, because they lazily (re)compute just the part of the pipeline that has changed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "import param\n",
    "import numpy as np\n",
    "import holoviews as hv\n",
    "\n",
    "from holoviews.operation.timeseries import rolling, rolling_outlier_std\n",
    "from holoviews.operation.datashader import datashade, dynspread\n",
    "\n",
    "hv.extension('bokeh')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Declare some data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "In this example we'll work with a timeseries that stands in for stock-price data.  We'll define a small function to generate a random, noisy timeseries, then define a ``DynamicMap`` that will generate a timeseries for each stock symbol:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def time_series(T=1, N=100, mu=0.1, sigma=0.1, S0=20):  \n",
    "    \"\"\"Parameterized noisy time series\"\"\"\n",
    "    dt = float(T)/N\n",
    "    t = np.linspace(0, T, N)\n",
    "    W = np.random.standard_normal(size = N) \n",
    "    W = np.cumsum(W)*np.sqrt(dt) # standard brownian motion\n",
    "    X = (mu-0.5*sigma**2)*t + sigma*W \n",
    "    S = S0*np.exp(X) # geometric brownian motion\n",
    "    return S\n",
    "\n",
    "def load_symbol(symbol, **kwargs):\n",
    "    return hv.Curve(time_series(N=10000), kdims=[('time', 'Time')],\n",
    "                    vdims=[('adj_close', 'Adjusted Close')])\n",
    "\n",
    "stock_symbols = ['AAPL', 'FB', 'IBM', 'GOOG', 'MSFT']\n",
    "dmap = hv.DynamicMap(load_symbol, kdims=['Symbol']).redim.values(Symbol=stock_symbols)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We will start by visualizing this data as-is:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%opts Curve [width=600] {+framewise}\n",
    "dmap"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Applying an operation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now let's start applying some operations to this data. HoloViews ships with two ready-to-use timeseries operations: the ``rolling`` operation, which applies a function over a rolling window, and a ``rolling_outlier_std`` operation that computes outlier points in a timeseries.  Specifically, ``rolling_outlier_std`` excludes points less than one sigma (standard deviation) away from the rolling mean, which is just one  example; you can trivially write your own operations that do whatever you like."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%opts Scatter (color='indianred')\n",
    "smoothed = rolling(dmap, rolling_window=30)\n",
    "outliers = rolling_outlier_std(dmap, rolling_window=30)\n",
    "smoothed * outliers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As you can see, the operations transform the ``Curve`` element into a smoothed version and a set of ``Scatter`` points containing the outliers both with a ``rolling_window`` of 30. Since we applied the operation to a ``DynamicMap``, the operation is lazy and only computes the result when it is requested. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Exercise: Apply the rolling and rolling_outlier_std operations changing the rolling_window and sigma parameters"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Linking operations to streams"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Instead of supplying the parameter values for each operation explicitly as a scalar value, we can also define a ``Stream`` that will let us update our visualization dynamically. By supplying a ``Stream`` with a ``rolling_window`` parameter to both operations, we can now generate our own events on the stream and watch our visualization update each time."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rolling_stream = hv.streams.Stream.define('rolling', rolling_window=5)\n",
    "stream = rolling_stream()\n",
    "\n",
    "rolled_dmap = rolling(dmap, streams=[stream])\n",
    "outlier_dmap = rolling_outlier_std(dmap, streams=[stream])\n",
    "rolled_dmap * outlier_dmap"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for i in range(20, 200, 20):\n",
    "    time.sleep(0.2)\n",
    "    stream.event(rolling_window=i)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Exercise: Create a stream to control the sigma value and add it to the outlier operation,\n",
    "#           then vary the sigma value and observe the effect"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Defining operations\n",
    "\n",
    "Defining custom Operations is also very straightforward. For instance, let's define an ``Operation`` to compute the residual between two overlaid ``Curve`` Elements. All we need to do is subclass from the ``Operation`` baseclass and define a ``_process`` method, which takes the ``Element`` or ``Overlay`` as input and returns a new ``Element``. The residual operation can then be used to subtract the y-values of the second Curve from those of the first Curve."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from holoviews.operation import Operation\n",
    "\n",
    "class residual(Operation):\n",
    "    \"\"\"\n",
    "    Subtracts two curves from one another.\n",
    "    \"\"\"\n",
    "    \n",
    "    label = param.String(default='Residual', doc=\"\"\"\n",
    "        Defines the label of the returned Element.\"\"\")\n",
    "    \n",
    "    def _process(self, element, key=None):\n",
    "        # Get first and second Element in overlay\n",
    "        el1, el2 = element.get(0), element.get(1)\n",
    "        \n",
    "        # Get x-values and y-values of curves\n",
    "        xvals  = el1.dimension_values(0)\n",
    "        yvals1 = el1.dimension_values(1)\n",
    "        yvals2 = el2.dimension_values(1)\n",
    "        \n",
    "        # Return new Element with subtracted y-values\n",
    "        # and new label\n",
    "        return el1.clone((xvals, yvals1-yvals2),\n",
    "                         vdims=[self.p.label])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To see what that looks like in action let's try it out by comparing the smoothed and original Curve."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "residual_dmap = residual(rolled_dmap * dmap)\n",
    "residual_dmap"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Since the stream we created is linked to one of the inputs of ``residual_dmap``, changing the stream values triggers updates both in the plot above and in our new residual plot."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for i in range(20, 200, 20):\n",
    "    time.sleep(0.2)\n",
    "    stream.event(rolling_window=i)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Chaining operations\n",
    "\n",
    "Of course, since operations simply transform an Element in some way, operations can easily be chained. As a simple example, we will take the ``rolled_dmap`` and apply the ``datashading`` and ``dynspread`` operation to it to construct a datashaded version of the plot. As you'll be able to see, this concise specification defines a complex analysis pipeline that gets reapplied whenever you change the Symbol or interact with the plot -- whenever the data needs to be updated."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%opts RGB [width=600 height=400] {+framewise}\n",
    "overlay = dynspread(datashade(rolled_dmap)) * outlier_dmap\n",
    "(overlay + residual_dmap).cols(1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Visualizing the pipeline"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To understand what is going on we will write a small utility that traverses the output we just displayed above and visualizes each processing step leading up to it."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%opts RGB Curve [width=250 height=200]\n",
    "\n",
    "def traverse(obj, key, items=None):\n",
    "    items = [] if items is None else items\n",
    "    for inp in obj.callback.inputs[:1]:\n",
    "        label = inp.callback.operation.name if isinstance(inp.callback, hv.core.OperationCallable) else 'price'\n",
    "        if inp.last: items.append(inp[key].relabel(label))\n",
    "        if isinstance(inp, hv.DynamicMap): traverse(inp, key, items)\n",
    "    return list(hv.core.util.unique_iterator(items))[:-1]\n",
    "\n",
    "hv.Layout(traverse(overlay, 'AAPL')).cols(4)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Reading from right to left, the original price timeseries is first smoothed with a rolling window, then datashaded, then each pixel is spread to cover a larger area. As you can see, arbitrarily many standard or custom operations can be defined to capture even very complex workflows so that they can be replayed dynamically as needed interactively."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Onwards\n",
    "\n",
    "Next we will look at how we can handle [large datasets](./10_Working_with_Large_Datasets.ipynb)."
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
